package com.heima.schedule.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.exception.CustomException;
import com.heima.common.redis.CacheService;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
 * @author CC
 * time 2024-05-08
 * description
 */
@Service
@Slf4j
@Transactional
public class TaskServiceImpl extends ServiceImpl<TaskinfoMapper , Taskinfo> implements TaskService {

    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    @Autowired
    private CacheService cacheService;

    @Override
    public long addTask(Task task) {
        boolean flag = true;
        try {
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task,taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);
            task.setTaskId(taskinfo.getTaskId());
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo,taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
            flag=true;
        }catch (Exception e){
            log.error("addTask error:{}",e.getMessage());
            flag=false;
        }
        if(!flag)return 0;
        String key = task.getTaskType()+"_"+task.getPriority();
        long executeTime = task.getExecuteTime();
        long l = System.currentTimeMillis();
        //加入立刻执行队列
        if (executeTime<=l) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
        }else {
            //若当前时间小于预设时间,则加入延迟等待队列
            LocalDateTime localDateTime = LocalDateTime.now().plusMinutes(5);
            long epochSecond = localDateTime.toEpochSecond(ZoneOffset.ofHours(8));
            if(executeTime<epochSecond){
               cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),executeTime);
            }
        }
        return task.getTaskId();
    }

    @Override
    public boolean cancelTask(long taskId) {
        if (taskId<=0) {
            throw new CustomException(AppHttpCodeEnum.PARAM_INVALID);
        }
        //查询数据库
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectOne(Wrappers.lambdaQuery(TaskinfoLogs.class)
                .eq(TaskinfoLogs::getTaskId, taskId));
        if (taskinfoLogs==null) {
            throw new CustomException(AppHttpCodeEnum.DATA_NOT_EXIST);
        }
        if (taskinfoLogs.getTaskType()== ScheduleConstants.CANCELLED) {
            return true;
        }
        if(taskinfoLogs.getTaskType()==ScheduleConstants.EXECUTED){
            return false;
        }
        //删除数据库记录和缓存记录
        taskinfoMapper.delete(Wrappers.lambdaQuery(Taskinfo.class).eq(Taskinfo::getTaskId, taskId));
        taskinfoLogs.setTaskType(ScheduleConstants.CANCELLED);
        taskinfoLogsMapper.updateById(taskinfoLogs);
        Task task = new Task();
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
        BeanUtils.copyProperties(taskinfoLogs,task);
        //删除缓存
        String key = taskinfoLogs.getTaskType() + "_" + taskinfoLogs.getPriority();
        if (taskinfoLogs.getExecuteTime().getTime()<=System.currentTimeMillis()) {
            cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
        }else {
            cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
        }
        return true;
    }

    @Override
    public Task poll(int type, int priority) {

        //拉取redis任务
        String key = type + "_" + priority;
        String json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.hasText(json)){
            Task task = JSON.parseObject(json, Task.class);
            //修改数据库信息
            Long taskId = task.getTaskId();
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            taskinfoLogs.setTaskType(ScheduleConstants.EXECUTED);
            taskinfoLogs.setTaskId(taskId);
            taskinfoMapper.deleteById(taskId);
            taskinfoLogsMapper.update(taskinfoLogs,Wrappers.lambdaUpdate(TaskinfoLogs.class)
                    .eq(TaskinfoLogs::getTaskId,taskId).set(TaskinfoLogs::getTaskType,ScheduleConstants.EXECUTED));
            return task;
        }
        return null;
    }

    @Scheduled(cron = "0 */1 * * * ?")
    public void refreshTask(){
        String rt = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
        if (!StringUtils.hasText(rt)) {
            return;
        }
        Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "_*");
        if (!CollectionUtils.isEmpty(scan)) {
            for (String s : scan) {
                String topKey  =ScheduleConstants.TOPIC+s.substring(ScheduleConstants.FUTURE.length());
                Set<String> tasks = cacheService.zRangeByScore(s, 0, System.currentTimeMillis());
                //同步数据
                if (!CollectionUtils.isEmpty(tasks)) {
                    cacheService.refreshWithPipeline(s,topKey,tasks);
                }
            }
        }
    }


    /**
     * 定时任务 ： 定时将数据库任务加载到redis中
     */
    @Scheduled(cron = "0 */5 * * * ?")
    public void reloadData(){
        clearCache();
        List<Taskinfo> taskinfos = taskinfoMapper.selectList(Wrappers.lambdaQuery(Taskinfo.class)
                .eq(Taskinfo::getTaskType, ScheduleConstants.SCHEDULED)
                .le(Taskinfo::getExecuteTime, LocalDateTime.now().plusMinutes(5).toEpochSecond(ZoneOffset.ofHours(8))));

        if (!CollectionUtils.isEmpty(taskinfos)) {
            taskinfos.stream().map(t->{
                Task task = new Task();
                BeanUtils.copyProperties(t,task);
                task.setExecuteTime(t.getExecuteTime().getTime());
                return task;
            }).forEach(task->{
                String key = task.getTaskType()+"_"+task.getPriority();
                long executeTime = task.getExecuteTime();
                long l = System.currentTimeMillis();
                //加入立刻执行队列
                if (executeTime<=l) {
                    cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
                }else {
                    //若当前时间小于预设时间,则加入延迟等待队列
                    LocalDateTime localDateTime = LocalDateTime.now().plusMinutes(5);
                    long epochSecond = localDateTime.toEpochSecond(ZoneOffset.ofHours(8));
                    if(executeTime<epochSecond){
                        cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),executeTime);
                    }
                }
            });
        }

    }
    public void clearCache(){
        Set<String> scan = cacheService.scan(ScheduleConstants.TOPIC + "*");
        Set<String> scan1 = cacheService.scan(ScheduleConstants.FUTURE + "*");
        cacheService.delete(scan);
        cacheService.delete(scan1);
    }


}
